/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.transforms;



import com.bff.gaia.unified.sdk.Pipeline;

import com.bff.gaia.unified.sdk.annotations.Experimental;

import com.bff.gaia.unified.sdk.coders.CannotProvideCoderException;

import com.bff.gaia.unified.sdk.coders.Coder;

import com.bff.gaia.unified.sdk.options.PipelineOptions;

import com.bff.gaia.unified.sdk.transforms.display.DisplayData.Builder;

import com.bff.gaia.unified.sdk.transforms.display.HasDisplayData;

import com.bff.gaia.unified.sdk.util.NameUtils;

import com.bff.gaia.unified.sdk.values.*;

import com.bff.gaia.unified.sdk.coders.CoderRegistry;

import com.bff.gaia.unified.sdk.io.TextIO;

import com.bff.gaia.unified.sdk.transforms.join.CoGroupByKey;

import com.bff.gaia.unified.sdk.values.PBegin;

import com.bff.gaia.unified.sdk.values.PCollection;

import com.bff.gaia.unified.sdk.values.PCollectionList;

import com.bff.gaia.unified.sdk.values.PCollectionTuple;

import com.bff.gaia.unified.sdk.values.PDone;

import com.bff.gaia.unified.sdk.values.PInput;

import com.bff.gaia.unified.sdk.values.POutput;

import com.bff.gaia.unified.sdk.values.PValue;

import com.bff.gaia.unified.sdk.values.TupleTag;



import javax.annotation.Nullable;

import java.io.ObjectInputStream;

import java.io.ObjectOutputStream;

import java.io.Serializable;

import java.util.Collections;

import java.util.Map;



/**

 * A {@code PTransform<InputT, OutputT>} is an operation that takes an {@code InputT} (some subtype

 * of {@link PInput}) and produces an {@code OutputT} (some subtype of {@link POutput}).

 *

 * <p>Common PTransforms include root PTransforms like {@link TextIO.Read},

 * {@link Create}, processing and conversion operations like {@link ParDo}, {@link GroupByKey},

 * {@link CoGroupByKey}, {@link Combine}, and {@link Count}, and

 * outputting PTransforms like {@link TextIO.Write}. Users also define their

 * own application-specific composite PTransforms.

 *

 * <p>Each {@code PTransform<InputT, OutputT>} has a single {@code InputT} type and a single {@code

 * OutputT} type. Many PTransforms conceptually transform one input value to one output value, and

 * in this case {@code InputT} and {@code Output} are typically instances of {@link

 * PCollection}. A root PTransform conceptually has no input; in this

 * case, conventionally a {@link PBegin} object produced by calling

 * {@link Pipeline#begin} is used as the input. An outputting PTransform conceptually has no output;

 * in this case, conventionally {@link PDone} is used as its output type.

 * Some PTransforms conceptually have multiple inputs and/or outputs; in these cases special

 * "bundling" classes like {@link PCollectionList}, {@link

 * PCollectionTuple} are used to combine multiple values into a single

 * bundle for passing into or returning from the PTransform.

 *

 * <p>A {@code PTransform<InputT, OutputT>} is invoked by calling {@code apply()} on its {@code

 * InputT}, returning its {@code OutputT}. Calls can be chained to concisely create linear pipeline

 * segments. For example:

 *

 * <pre>{@code

 * PCollection<T1> pc1 = ...;

 * PCollection<T2> pc2 =

 *     pc1.apply(ParDo.of(new MyDoFn<T1,KV<K,V>>()))

 *        .apply(GroupByKey.<K, V>create())

 *        .apply(Combine.perKey(new MyKeyedCombineFn<K,V>()))

 *        .apply(ParDo.of(new MyDoFn2<KV<K,V>,T2>()));

 * }</pre>

 *

 * <p>PTransform operations have unique names, which are used by the system when explaining what's

 * going on during optimization and execution. Each PTransform gets a system-provided default name,

 * but it's a good practice to specify a more informative explicit name when applying the transform.

 * For example:

 *

 * <pre>{@code

 * ...

 * .apply("Step1", ParDo.of(new MyDoFn3()))

 * ...

 * }</pre>

 *

 * <p>Each PCollection output produced by a PTransform, either directly or within a "bundling"

 * class, automatically gets its own name derived from the name of its producing PTransform.

 *

 * <p>Each PCollection output produced by a PTransform also records a {@link

 * Coder} that specifies how the elements of that PCollection are to be

 * encoded as a byte string, if necessary. The PTransform may provide a default Coder for any of its

 * outputs, for instance by deriving it from the PTransform input's Coder. If the PTransform does

 * not specify the Coder for an output PCollection, the system will attempt to infer a Coder for it,

 * based on what's known at run-time about the Java type of the output's elements. The enclosing

 * {@link Pipeline}'s {@link CoderRegistry} (accessible via {@link

 * Pipeline#getCoderRegistry}) defines the mapping from Java types to the default Coder to use, for

 * a standard set of Java types; users can extend this mapping for additional types, via {@link

 * CoderRegistry#registerCoderProvider}. If this inference process fails,

 * either because the Java type was not known at run-time (e.g., due to Java's "erasure" of generic

 * types) or there was no default Coder registered, then the Coder should be specified manually by

 * calling {@link PCollection#setCoder} on the output PCollection. The Coder of every output

 * PCollection must be determined one way or another before that output is used as an input to

 * another PTransform, or before the enclosing Pipeline is run.

 *

 * <p>A small number of PTransforms are implemented natively by the Apache Unified SDK; such

 * PTransforms simply return an output value as their apply implementation. The majority of

 * PTransforms are implemented as composites of other PTransforms. Such a PTransform subclass

 * typically just implements {@link #expand}, computing its Output value from its {@code InputT}

 * value. User programs are encouraged to use this mechanism to modularize their own code. Such

 * composite abstractions get their own name, and navigating through the composition hierarchy of

 * PTransforms is supported by the monitoring interface. Examples of composite PTransforms can be

 * found in this directory and in examples. From the caller's point of view, there is no distinction

 * between a PTransform implemented natively and one implemented in terms of other PTransforms; both

 * kinds of PTransform are invoked in the same way, using {@code apply()}.

 *

 * <h3>Note on Serialization</h3>

 *

 * <p>{@code PTransform} doesn't actually support serialization, despite implementing {@code

 * Serializable}.

 *

 * <p>{@code PTransform} is marked {@code Serializable} solely because it is common for an anonymous

 * {@link DoFn}, instance to be created within an {@code apply()} method of a composite {@code

 * PTransform}.

 *

 * <p>Each of those {@code *Fn}s is {@code Serializable}, but unfortunately its instance state will

 * contain a reference to the enclosing {@code PTransform} instance, and so attempt to serialize the

 * {@code PTransform} instance, even though the {@code *Fn} instance never references anything about

 * the enclosing {@code PTransform}.

 *

 * <p>To allow such anonymous {@code *Fn}s to be written conveniently, {@code PTransform} is marked

 * as {@code Serializable}, and includes dummy {@code writeObject()} and {@code readObject()}

 * operations that do not save or restore any state.

 *

 * @see <a href= "https://unified.apache.org/documentation/programming-guide/#transforms" >Applying

 *     Transformations</a>

 * @param <InputT> the type of the input to this PTransform

 * @param <OutputT> the type of the output of this PTransform

 */

public abstract class PTransform<InputT extends PInput, OutputT extends POutput>

    implements Serializable /* See the note above */, HasDisplayData {

  /**

   * Override this method to specify how this {@code PTransform} should be expanded on the given

   * {@code InputT}.

   *

   * <p>NOTE: This method should not be called directly. Instead apply the {@code PTransform} should

   * be applied to the {@code InputT} using the {@code apply} method.

   *

   * <p>Composite transforms, which are defined in terms of other transforms, should return the

   * output of one of the composed transforms. Non-composite transforms, which do not apply any

   * transforms internally, should return a new unbound output and register evaluators (via

   * backend-specific registration methods).

   */

  public abstract OutputT expand(InputT input);



  /**

   * Called before running the Pipeline to verify this transform is fully and correctly specified.

   *

   * <p>By default, does nothing.

   */

  public void validate(@Nullable PipelineOptions options) {}



  /**

   * Returns all {@link PValue PValues} that are consumed as inputs to this {@link PTransform} that

   * are independent of the expansion of the {@link InputT} within {@link #expand(PInput)}.

   *

   * <p>For example, this can contain any side input consumed by this {@link PTransform}.

   */

  public Map<TupleTag<?>, PValue> getAdditionalInputs() {

    return Collections.emptyMap();

  }



  /**

   * Returns the transform name.

   *

   * <p>This name is provided by the transform creator and is not required to be unique.

   */

  public String getName() {

    return name != null ? name : getKindString();

  }



  /////////////////////////////////////////////////////////////////////////////



  // See the note about about PTransform's fake Serializability, to

  // understand why all of its instance state is transient.



  /**

   * The base name of this {@code PTransform}, e.g., from defaults, or {@code null} if not yet

   * assigned.

   */

  @Nullable

  protected final transient String name;



  protected PTransform() {

    this.name = null;

  }



  protected PTransform(@Nullable String name) {

    this.name = name;

  }



  @Override

  public String toString() {

    if (name == null) {

      return getKindString();

    } else {

      return getName() + " [" + getKindString() + "]";

    }

  }



  /**

   * Returns the name to use by default for this {@code PTransform} (not including the names of any

   * enclosing {@code PTransform}s).

   *

   * <p>By default, returns the base name of this {@code PTransform}'s class.

   *

   * <p>The caller is responsible for ensuring that names of applied {@code PTransform}s are unique,

   * e.g., by adding a uniquifying suffix when needed.

   */

  protected String getKindString() {

    if (getClass().isAnonymousClass()) {

      return "AnonymousTransform";

    } else {

      return NameUtils.approximatePTransformName(getClass());

    }

  }



  private void writeObject(ObjectOutputStream oos) {

    // We don't really want to be serializing this object, but we

    // often have serializable anonymous DoFns nested within a

    // PTransform.

  }



  private void readObject(ObjectInputStream oos) {

    // We don't really want to be serializing this object, but we

    // often have serializable anonymous DoFns nested within a

    // PTransform.

  }



  /**

   * Returns the default {@code Coder} to use for the output of this single-output {@code

   * PTransform}.

   *

   * <p>By default, always throws

   *

   * @throws CannotProvideCoderException if no coder can be inferred

   * @deprecated Instead, the PTransform should explicitly call {@link PCollection#setCoder} on the

   *     returned PCollection.

   */

  @Deprecated

  protected Coder<?> getDefaultOutputCoder() throws CannotProvideCoderException {

    throw new CannotProvideCoderException("PTransform.getOutputCoder called.");

  }



  /**

   * Returns the default {@code Coder} to use for the output of this single-output {@code

   * PTransform} when applied to the given input.

   *

   * <p>By default, always throws.

   *

   * @throws CannotProvideCoderException if none can be inferred.

   * @deprecated Instead, the PTransform should explicitly call {@link PCollection#setCoder} on the

   *     returned PCollection.

   */

  @Deprecated

  protected Coder<?> getDefaultOutputCoder(@SuppressWarnings("unused") InputT input)

      throws CannotProvideCoderException {

    return getDefaultOutputCoder();

  }



  /**

   * Returns the default {@code Coder} to use for the given output of this single-output {@code

   * PTransform} when applied to the given input.

   *

   * <p>By default, always throws.

   *

   * @throws CannotProvideCoderException if none can be inferred.

   * @deprecated Instead, the PTransform should explicitly call {@link PCollection#setCoder} on the

   *     returned PCollection.

   */

  @Deprecated

  public <T> Coder<T> getDefaultOutputCoder(

      InputT input, @SuppressWarnings("unused") PCollection<T> output)

      throws CannotProvideCoderException {

    @SuppressWarnings("unchecked")

	Coder<T> defaultOutputCoder = (Coder<T>) getDefaultOutputCoder(input);

    return defaultOutputCoder;

  }



  /**

   * {@inheritDoc}

   *

   * <p>By default, does not register any display data. Implementors may override this method to

   * provide their own display data.

   */

  @Override

  public void populateDisplayData(Builder builder) {}



  /**

   * For a {@code SerializableFunction<InputT, OutputT>} {@code fn}, returns a {@code PTransform}

   * given by applying {@code fn.apply(v)} to the input {@code PCollection<InputT>}.

   *

   * <p>Allows users to define a concise composite transform using a Java 8 lambda expression. For

   * example:

   *

   * <pre>{@code

   * PCollection<String> words = wordsAndErrors.apply(

   *   (PCollectionTuple input) -> {

   *     input.get(errorsTag).apply(new WriteErrorOutput());

   *     return input.get(wordsTag);

   *   });

   * }</pre>

   */

  @Experimental

  public static <InputT extends PInput, OutputT extends POutput>

  PTransform<InputT, OutputT> compose(SerializableFunction<InputT, OutputT> fn) {

    return new PTransform<InputT, OutputT>() {

      @Override

      public OutputT expand(InputT input) {

        return fn.apply(input);

      }

    };

  }



  /** Like {@link #compose(SerializableFunction)}, but with a custom name. */

  @Experimental

  public static <InputT extends PInput, OutputT extends POutput>

  PTransform<InputT, OutputT> compose(String name, SerializableFunction<InputT, OutputT> fn) {

    return new PTransform<InputT, OutputT>(name) {

      @Override

      public OutputT expand(InputT input) {

        return fn.apply(input);

      }

    };

  }

}